Digdag + EmbulkによるTSVファイルのS3→Redshiftロード #digdag
はじめに
こんにちは、yokatsukiです。
先日6月18日、第五回ゲームサーバ勉強会に参加してきました。
そこで、トレジャーデータのサポートエンジニアマネージャー高橋様から、直前の6月15日にオープンソース化されたばかりのDigdagの説明がありました。その時の発表スライドは下記です。
Digdagは弊社でも何名かが既に触ってブログで公開(下記)しているので、名前と目的は知ってましたが、説明とデモを見るうちに自分でも試してみたくなりました。
- Embulk界隈で話題になっている分散ワークフローエンジン「DigDag」について調べてみた #digdag
- Treasure Data社のOSSワークフローエンジン『Digdag』を試してみた #digdag|Developers.IO
という訳で、簡単ではありますが共有します。
テーブル定義の無いTSVファイルをRedshiftへロードする
通常、TSVファイルをRedshiftにロードする為には、事前にRedshift上にテーブルを作成しておく必要があります。この時のテーブル設計作業は思った以上に面倒なものです。事前にテーブル定義書があったとしても、Redshiftに適応するための編集は必要になりますし、テーブル定義書が実際のテーブル構造と異なっていた、というのもあるあるだと思います。
この作業を軽減してくれるのが、Embulkでテーブルのカラム定義を推測してくれるguess機能なのですが、これも実行するには下記の手順が必要になります。
- 定義ファイルを作成する為に必要なリソースへのアクセス情報をymlで記述する
- 上記ファイルをguessコマンドで読み込み、定義ファイルを生成する
- runコマンドで定義ファイルを読み込み、テーブルの作成とファイル内容のロードを実行する
こちらの手順をDigdagでまとめ、コマンド一発で実行できるようにしました。
実行環境作成
AWS環境準備
動作を確認するために、AWS環境では以下の準備を行いました。
- IAMユーザ、アクセスキー発行(cm-katsuki.yosuke)
- データ格納用S3バケット作成(cm-embulk-source-data-yokatsuki)
- 一時作業用S3バケット作成(cm-embulk-temp-yokatsuki)
- VPC, Subunet設定(10.0.0.0/24)
- Redshiftクラスタ起動(dc1.large)
- Embulk, Digdag実行用EC2インスタンス起動(t2.micro)
Digdag、Embulk準備
起動したEC2インスタンスに、Digdag、Embulkの実行環境を準備しました。
JDKインストール
事前にオラクル社のページからRPMパッケージをダウンロードしておきます。
$ sudo rpm -ivh jdk-8u92-linux-x64.rpm
Digdagインストール
$ curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest" $ chmod +x /usr/local/bin/digdag
Embulkインストール
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar" $ chmod +x ~/.embulk/bin/embulk
なお、DigdagやEmbulkには、PATHを通しておきました。
Embulkプラグインインストール
S3からファイルを取得し、Redshiftに格納するそれぞれのプラグインを追加しました。
$ embulk gem install embulk-input-s3 $ embulk gem install embulk-output-redshift
結果、以下の構成ができあがりました。
データの準備
データ準備
ロード対象データとして、Tableauユーザお馴染みのSuperstoreサンプルを使用しました。 これはExcelファイルなので、Ordersシートの内容を、タブ区切りのテキストファイルとして保存します。また、Embulkでは見出し(先頭行)を自動的にテーブルのカラム名に設定することができるのですが、現在のRedshiftでは日本語カラムが定義できないので、「製品名」等の見出しを「product_name」等の英語表記に変えておきます。
Digdagプロジェクト作成
initコマンドでプロジェクトを作成します。
$ digdag init tsvload 2016-06-22 08:45:42 +0000: Digdag v0.8.2 Creating tsvload/.gitignore Creating tsvload/tasks/shell_sample.sh Creating tsvload/tasks/repeat_hello.sh Creating tsvload/tasks/__init__.py Creating tsvload/tsvload.dig Done. Type `cd tsvload` and then `digdag run tsvload.dig` to run the workflow. Enjoy!
作成されたプロジェクトにはtasksサブディレクトリが作られ、その中にサンプルが用意されますが、これは削除しても大丈夫です。
ワークフロー記述
Digdagのドキュメントを参考にしながら、tsvload.digファイルにタスクを記述します。
timezone: UTC +guess: sh>: embulk guess embulk/guess-redshift-output.yml -o embulk/config-redshift-output.yml +load: embulk>: embulk/config-redshift-output.yml
内容を簡単に説明すると、まずguessタスクで、シェルコマンドとしてembulkのguessを実行します。読み取る設定ファイルや結果の設定ファイルはembulkサブディレクトリ以下に置きます。次にloadタスクでembulkのロードを実行します。
どのような技術を使ってタスクを実行するか、は各処理の先頭に記述する「オペレータ」と呼ばれる識別子で決定するのですが、embulkオペレータではguessを実行する事ができないようなので、シェルオペレータを使用しました。
Embulk設定ファイル記述
上記ワークフローの記述に沿うよう、embuklサブディレクトリを作り、その下にEmbulkでguessに使用する設定ファイルを記述します。
in: type: s3 bucket: cm-embulk-source-data-yokatsuki path_prefix: Sample access_key_id: xxxxxx secret_access_key: xxxxxx out: type: redshift host: myredshift.xxxxxx.us-east-1.redshift.amazonaws.com user: xxxxxx password: xxxxxx database: xxxxxx table: orders_test_embulk access_key_id: xxxxxx secret_access_key: xxxxxx iam_user_name: cm-katsuki.yosuke s3_bucket: cm-embulk-temp-yokatsuki s3_key_prefix: temp/redshift mode: insert
ここまでで、Digdagプロジェクト以下の構造は以下の通りになります。
tsvload(プロジェクトディレクトリ) ├── embulk │ └── guess-redshift-output.yml └── tsvload.dig
Digdagワークフロー実行
ここまで準備できれば後は実行するだけです。プロジェクトディレクトリ下で、以下のコマンドを実行し、ワークフローを開始します。
$ digdag run tsvload.dig
以下のメッセージが出力され、タスクが実行されていきます。
2016-06-22 09:18:10 +0000: Digdag v0.8.2 2016-06-22 09:18:11 +0000 [WARN] (main): Using a new session time 2016-06-22T00:00:00+00:00. 2016-06-22 09:18:11 +0000 [INFO] (main): Using session .digdag/status/20160622T000000+0000. 2016-06-22 09:18:11 +0000 [INFO] (main): Starting a new session project id=1 workflow name=tsvload session_time=2016-06-22T00:00:00+00:00 2016-06-22 09:18:12 +0000 [INFO] (0017@+tsvload+guess): sh>: embulk guess embulk/guess-redshift-output.yml -o embulk/config-redshift-output.yml 2016-06-22 09:18:15.741 +0000: Embulk v0.8.9 2016-06-22 09:18:17.002 +0000 [INFO] (0001:guess): Loaded plugin embulk-input-s3 (0.2.8) 2016-06-22 09:18:18.026 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path 2016-06-22 09:18:18.056 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path 2016-06-22 09:18:18.074 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path 2016-06-22 09:18:18.082 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path in: type: s3 bucket: cm-embulk-source-data-yokatsuki path_prefix: Sample access_key_id: xxxxxx secret_access_key: xxxxxx parser: charset: MS932 newline: CRLF type: csv delimiter: "\t" quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: order_id, type: long} - {name: order_date, type: timestamp, format: '%Y/%m/%d'} - {name: order_priority, type: string} - {name: order_quantity, type: long} - {name: sales, type: string} - {name: discount, type: string} - {name: ship_mode, type: string} - {name: profit, type: long} - {name: unit_price, type: long} - {name: advertising_expenses, type: string} - {name: shipping_cost, type: long} - {name: customer_name, type: string} - {name: prefecture, type: string} - {name: city, type: string} - {name: region, type: string} - {name: shop_name, type: string} - {name: customer_segment, type: string} - {name: product_category, type: string} - {name: product_sub_category, type: string} - {name: product_id, type: string} - {name: product_name, type: string} - {name: product_description, type: string} - {name: product_container, type: string} - {name: product_base_margin, type: double} - {name: supplier, type: string} - {name: delivery_date, type: timestamp, format: '%Y/%m/%d'} - {name: ship_date, type: timestamp, format: '%Y/%m/%d'} out: {type: redshift, host: myredshift.xxxxxx.us-east-1.redshift.amazonaws.com, user: xxxxxx, password: xxxxxx, database: xxxxxx, table: orders_test_embulk, access_key_id: xxxxxx, secret_access_key: xxxxxx, iam_user_name: cm-katsuki.yosuke, s3_bucket: cm-embulk-temp-yokatsuki, s3_key_prefix: temp/redshift, mode: insert} Created 'embulk/config-redshift-output.yml' file. 2016-06-22 09:18:20 +0000 [INFO] (0017@+tsvload+load): embulk>: embulk/config-redshift-output.yml 2016-06-22 09:18:26.427 +0000: Embulk v0.8.9 2016-06-22 09:18:29.312 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-s3 (0.2.8) 2016-06-22 09:18:29.406 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-redshift (0.6.0) 2016-06-22 09:18:30.704 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1 2016-06-22 09:18:30.746 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://myredshift.xxxxxx.us-east-1.redshift.amazonaws.com:5439/xxxxxx options {user=root, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-06-22 09:18:30.861 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-06-22 09:18:30.865 +0000 [INFO] (0001:transaction): > 0.00 seconds 2016-06-22 09:18:30.865 +0000 [INFO] (0001:transaction): Using insert mode 2016-06-22 09:18:30.913 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "orders_test_0bbdfb40_bl_tmp000" 2016-06-22 09:18:30.917 +0000 [INFO] (0001:transaction): > 0.00 seconds 2016-06-22 09:18:31.110 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "orders_test_0bbdfb40_bl_tmp000" ("order_id" BIGINT, "order_date" TIMESTAMP, "order_priority" VARCHAR(65535), "order_quantity" BIGINT, "sales" VARCHAR(65535), "discount" VARCHAR(65535), "ship_mode" VARCHAR(65535), "profit" BIGINT, "unit_price" BIGINT, "advertising_expenses" VARCHAR(65535), "shipping_cost" BIGINT, "customer_name" VARCHAR(65535), "prefecture" VARCHAR(65535), "city" VARCHAR(65535), "region" VARCHAR(65535), "shop_name" VARCHAR(65535), "customer_segment" VARCHAR(65535), "product_category" VARCHAR(65535), "product_sub_category" VARCHAR(65535), "product_id" VARCHAR(65535), "product_name" VARCHAR(65535), "product_description" VARCHAR(65535), "product_container" VARCHAR(65535), "product_base_margin" DOUBLE PRECISION, "supplier" VARCHAR(65535), "delivery_date" TIMESTAMP, "ship_date" TIMESTAMP) 2016-06-22 09:18:31.122 +0000 [INFO] (0001:transaction): > 0.01 seconds 2016-06-22 09:18:31.673 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 2016-06-22 09:18:31.722 +0000 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://myredshift.xxxxxx.us-east-1.redshift.amazonaws.com:5439/xxxxxx options {user=root, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-06-22 09:18:31.961 +0000 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-06-22 09:18:31.964 +0000 [INFO] (0017:task-0000): > 0.00 seconds 2016-06-22 09:18:31.964 +0000 [INFO] (0017:task-0000): Copy SQL: COPY "orders_test_0bbdfb40_bl_tmp000" ("order_id", "order_date", "order_priority", "order_quantity", "sales", "discount", "ship_mode", "profit", "unit_price", "advertising_expenses", "shipping_cost", "customer_name", "prefecture", "city", "region", "shop_name", "customer_segment", "product_category", "product_sub_category", "product_id", "product_name", "product_description", "product_container", "product_base_margin", "supplier", "delivery_date", "ship_date") ? GZIP DELIMITER '\t' NULL '\\N' ESCAPE TRUNCATECOLUMNS ACCEPTINVCHARS STATUPDATE OFF COMPUPDATE OFF 2016-06-22 09:18:41.730 +0000 [INFO] (pool-2-thread-1): Uploading file id temp/redshift/ac60ed7f-d163-4a93-b330-17ef8e6a9263 to S3 (773,477 bytes 8,369 rows) 2016-06-22 09:18:42.093 +0000 [INFO] (pool-2-thread-1): Uploaded file temp/redshift/ac60ed7f-d163-4a93-b330-17ef8e6a9263 (0.36 seconds) 2016-06-22 09:18:42.101 +0000 [INFO] (pool-2-thread-2): SQL: SET search_path TO "public" 2016-06-22 09:18:42.103 +0000 [INFO] (pool-2-thread-2): > 0.00 seconds 2016-06-22 09:18:42.103 +0000 [INFO] (pool-2-thread-2): Running COPY from file temp/redshift/ac60ed7f-d163-4a93-b330-17ef8e6a9263 2016-06-22 09:18:43.280 +0000 [INFO] (pool-2-thread-2): Loaded file temp/redshift/ac60ed7f-d163-4a93-b330-17ef8e6a9263 (1.05 seconds for COPY) 2016-06-22 09:18:43.300 +0000 [INFO] (0017:task-0000): Loaded 1 files. 2016-06-22 09:18:43.302 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2016-06-22 09:18:43.302 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://myredshift.xxxxxx.us-east-1.redshift.amazonaws.com:5439/xxxxxx options {user=root, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800} 2016-06-22 09:18:43.308 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-06-22 09:18:43.311 +0000 [INFO] (0001:transaction): > 0.00 seconds 2016-06-22 09:18:43.323 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "orders_test_embulk" ("order_id" BIGINT, "order_date" TIMESTAMP, "order_priority" VARCHAR(65535), "order_quantity" BIGINT, "sales" VARCHAR(65535), "discount" VARCHAR(65535), "ship_mode" VARCHAR(65535), "profit" BIGINT, "unit_price" BIGINT, "advertising_expenses" VARCHAR(65535), "shipping_cost" BIGINT, "customer_name" VARCHAR(65535), "prefecture" VARCHAR(65535), "city" VARCHAR(65535), "region" VARCHAR(65535), "shop_name" VARCHAR(65535), "customer_segment" VARCHAR(65535), "product_category" VARCHAR(65535), "product_sub_category" VARCHAR(65535), "product_id" VARCHAR(65535), "product_name" VARCHAR(65535), "product_description" VARCHAR(65535), "product_container" VARCHAR(65535), "product_base_margin" DOUBLE PRECISION, "supplier" VARCHAR(65535), "delivery_date" TIMESTAMP, "ship_date" TIMESTAMP) 2016-06-22 09:18:43.451 +0000 [INFO] (0001:transaction): > 0.13 seconds 2016-06-22 09:18:43.640 +0000 [INFO] (0001:transaction): SQL: INSERT INTO "orders_test_embulk" ("order_id", "order_date", "order_priority", "order_quantity", "sales", "discount", "ship_mode", "profit", "unit_price", "advertising_expenses", "shipping_cost", "customer_name", "prefecture", "city", "region", "shop_name", "customer_segment", "product_category", "product_sub_category", "product_id", "product_name", "product_description", "product_container", "product_base_margin", "supplier", "delivery_date", "ship_date") SELECT "order_id", "order_date", "order_priority", "order_quantity", "sales", "discount", "ship_mode", "profit", "unit_price", "advertising_expenses", "shipping_cost", "customer_name", "prefecture", "city", "region", "shop_name", "customer_segment", "product_category", "product_sub_category", "product_id", "product_name", "product_description", "product_container", "product_base_margin", "supplier", "delivery_date", "ship_date" FROM "orders_test_0bbdfb40_bl_tmp000" 2016-06-22 09:18:43.692 +0000 [INFO] (0001:transaction): > 0.05 seconds (8,369 rows) 2016-06-22 09:18:44.567 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://myredshift.xxxxxx.us-east-1.redshift.amazonaws.com:5439/xxxxxx options {user=root, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-06-22 09:18:44.576 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-06-22 09:18:44.578 +0000 [INFO] (0001:transaction): > 0.00 seconds 2016-06-22 09:18:44.578 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "orders_test_0bbdfb40_bl_tmp000" 2016-06-22 09:18:44.808 +0000 [INFO] (0001:transaction): > 0.23 seconds 2016-06-22 09:18:44.817 +0000 [INFO] (main): Committed. 2016-06-22 09:18:44.821 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"Sample - Superstore Sales Japan-2013(revised).txt"},"out":{}} Success. Task state is saved at .digdag/status/20160622T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
SQL Workbench/JでRedshiftにログインし、テーブルがロードされていることを確認しました。Embulkのお陰で、文字コードもMS932からUTF-8へ自動変換されており、文字化けは発生していません。
まとめと今後
Digdagを使って、Embulk処理自動化の第一歩として、ざっくりとではありますが、guessとrunをまとめて実行することができました。事前準備は色々ありましたが、結局手作業で記述したファイルは、.digファイルとguess用の.ymlファイルのたった2つだけでした。これだけで
今後、以下についても試していきます。
- 変数の活用
- 複数テーブルへの対応
- ロード失敗時の処理
- 定期実行
- 並列実行
それでは、また。